In [ ]:
import apache_beam as beam
from apache_beam.runners.interactive import interactive_runner
from apache_beam.runners.portability import flink_runner
p = beam.Pipeline(interactive_runner.InteractiveRunner(underlying_runner=flink_runner.FlinkRunner()))
In [ ]:
init_pcoll = p | beam.Create(range(10))
squares = init_pcoll | 'Square' >> beam.Map(lambda x: x*x)
cubes = init_pcoll | 'Cube' >> beam.Map(lambda x: x**3)
result = p.run()
result.wait_until_finish()
In [ ]:
result.get(squares)
In [ ]:
class AverageFn(beam.CombineFn):
def create_accumulator(self):
return (0.0, 0)
def add_input(self, sum_count, input):
(sum, count) = sum_count
return sum + input, count + 1
def merge_accumulators(self, accumulators):
sums, counts = zip(*accumulators)
return sum(sums), sum(counts)
def extract_output(self, sum_count):
(sum, count) = sum_count
return sum / count if count else float('NaN')
In [ ]:
average_square = squares | 'Average Square' >> beam.CombineGlobally(AverageFn())
average_cube = cubes | 'Average Cube' >> beam.CombineGlobally(AverageFn())
result = p.run()
In [ ]:
result.get(average_square)